Skip to content

[python] Use pypaimon with Google Cloud Storage#7769

Open
larssk wants to merge 3 commits intoapache:masterfrom
larssk:ls/pypaimon-gcs
Open

[python] Use pypaimon with Google Cloud Storage#7769
larssk wants to merge 3 commits intoapache:masterfrom
larssk:ls/pypaimon-gcs

Conversation

@larssk
Copy link
Copy Markdown

@larssk larssk commented May 5, 2026

Purpose

Use Google Cloud Storage (GCS) as warehouse / object storage for pypaimon (without java dependencies). This PR adds PyArrowFileIO handling for GCS.

Changes

1. Scheme dispatch (pyarrow_file_io.py)

Added a gs branch in __init__ alongside the existing s3 and hdfs branches:

elif scheme == "gs":
    self.filesystem = self._initialize_gcs_fs()

2. _initialize_gcs_fs() method (pyarrow_file_io.py)

Uses pyarrow.fs.GcsFileSystem, which is already available via pyarrow[gcs].

With no arguments, GcsFileSystem picks up credentials automatically via Application Default Credentials (ADC): GOOGLE_APPLICATION_CREDENTIALS, the GCP metadata server, or Workload Identity on GKE/GCE. Three optional properties allow explicit credential passing:

Property Description
gcs.access-token OAuth2 access token. If unset, ADC is used.
gcs.access-token.expiration ISO 8601 expiry datetime for the token.
gcs.project-id GCP project ID for billing/quota.
def _initialize_gcs_fs(self) -> FileSystem:
    access_token = self._get_property("gcs.access-token")
    token_expiry = self._get_property("gcs.access-token.expiration")
    project_id = self._get_property("gcs.project-id")

    kwargs = {}
    if access_token:
        from datetime import datetime
        kwargs["access_token"] = access_token
        kwargs["credential_token_expiration"] = (
            datetime.fromisoformat(token_expiry) if token_expiry
            else datetime(9999, 12, 31)
        )
    if project_id:
        kwargs["project_id"] = project_id

    # With no kwargs, GcsFileSystem uses ADC automatically
    return pafs.GcsFileSystem(**kwargs)

3. Path fix in to_filesystem_path() (pyarrow_file_io.py)

to_filesystem_path() had a catch-all for non-S3/non-HDFS schemes that returned only uri.path — e.g. for gs://my-bucket/data/table it returned /data/table, stripping the bucket name. GcsFileSystem (like S3FileSystem) expects paths in bucket/key form without a leading slash.

Added a GcsFileSystem branch mirroring the existing S3 logic:

from pyarrow.fs import GcsFileSystem
if isinstance(self.filesystem, GcsFileSystem):
    if parsed.scheme and parsed.netloc:
        path_part = normalized_path.lstrip('/')
        return f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc
    return str(path)

4. GcsOptions class (config.py)

Added a GcsOptions class alongside the existing S3Options documenting the three new properties.

class GcsOptions:
    GCS_ACCESS_TOKEN = ConfigOptions.key("gcs.access-token").string_type().no_default_value().with_description(
        "GCS access token. If not set, ADC (Application Default Credentials) is used automatically.")
    ...

Linked issue

#7768

Tests

Unit tests — pypaimon/tests/file_io_test.py

A new GCSFileIOPathTest class is added to the existing file_io_test.py, following the same pattern as the existing S3/OSS path conversion tests. These tests require no GCS credentials.

Test What it checks
test_gcs_filesystem_type PyArrowFileIO("gs://...") produces a pafs.GcsFileSystem instance
test_gcs_path_conversion gs://bucket/key maps to bucket/key (bucket prepended, no leading slash)
test_gcs_path_bucket_only gs://bucket with no path component maps to bucket
test_gcs_path_normalization Consecutive slashes in the path are collapsed (e.g. gs://bucket///a///bbucket/a/b)
test_gcs_path_idempotency Already-converted bucket/key paths pass through unchanged
test_gcs_path_no_leading_slash to_filesystem_path never returns a path starting with / for any GCS URI
python -m pytest pypaimon/tests/file_io_test.py::GCSFileIOPathTest -v

Integration tests — pypaimon/tests/gcs_file_io_test.py

A new GCSFileIOTest class is added following the pattern of oss_file_io_test.py. All tests are skipped automatically when GCS_TEST_BUCKET is not set. Credentials are picked up via ADC — no explicit credential properties are required.

Test What it checks
test_gcs_filesystem_type Live PyArrowFileIO with gs:// uses GcsFileSystem
test_exists exists() returns False for non-existent paths; get_file_status() raises FileNotFoundError
test_write_and_read_file write_file() / read_file_utf8() round-trip
test_write_file_overwrite write_file(..., overwrite=False) raises FileExistsError; overwrite=True replaces content
test_new_input_stream_read new_output_stream() / new_input_stream() binary round-trip; FileNotFoundError for missing file
test_get_file_status_directory get_file_status() returns FileType.Directory for a directory
test_get_file_status_file get_file_status() returns FileType.File with non-None size
test_delete_returns_false_when_not_exists delete() returns False for non-existent file/directory
test_delete_non_empty_directory_raises_error delete(..., recursive=False) raises OSError for non-empty directory
test_rename_returns_false_when_dst_exists rename() returns False when destination already exists
test_copy_file copy_file(..., overwrite=False) raises FileExistsError; overwrite=True replaces content
test_try_to_write_atomic try_to_write_atomic() writes content and returns True on success
test_mkdirs_raises_error_when_path_is_file mkdirs() raises FileExistsError when path is an existing file
# Skips all tests (no bucket configured)
python -m pytest pypaimon/tests/gcs_file_io_test.py -v

# Runs all tests against a real GCS bucket using ADC
GCS_TEST_BUCKET=my-bucket python -m pytest pypaimon/tests/gcs_file_io_test.py -v

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant